-
Notifications
You must be signed in to change notification settings - Fork 139
refactor: extract deduplication logic from AddRemoveDedupVisitor into embeddable FileActionsDeduplicator
#769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: extract deduplication logic from AddRemoveDedupVisitor into embeddable FileActionsDeduplicator
#769
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #769 +/- ##
==========================================
+ Coverage 84.64% 84.69% +0.04%
==========================================
Files 82 83 +1
Lines 19735 19816 +81
Branches 19735 19816 +81
==========================================
+ Hits 16705 16783 +78
Misses 2214 2214
- Partials 816 819 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the deduplication logic by extracting it from AddRemoveDedupVisitor into a standalone, embeddable FileActionDeduplicator to enable reuse with the upcoming CheckpointVisitor.
- Moves deduplication logic into a dedicated FileActionDeduplicator in the new log_replay module.
- Refactors AddRemoveDedupVisitor to utilize FileActionDeduplicator via index constants.
- Updates the module hierarchy in lib.rs to register the new log_replay module.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| kernel/src/log_replay.rs | Introduces FileActionDeduplicator struct to encapsulate deduplication logic. |
| kernel/src/lib.rs | Adds the new log_replay module. |
| kernel/src/scan/log_replay.rs | Refactors AddRemoveDedupVisitor to use FileActionDeduplicator and its index constants. |
Comments suppressed due to low confidence (1)
kernel/src/scan/log_replay.rs:43
- [nitpick] Consider adding inline comments for the index constants (e.g., ADD_PATH_INDEX, ADD_PARTITION_VALUES_INDEX) to clearly explain their mapping to file action fields and reduce ambiguity in future maintenance.
const ADD_PATH_INDEX: usize = 0;
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally looks good, left a few comments. can we get additional context on the material changes here? hard to see what's being changed vs what's being moved. (might even be a good idea to have a prefactor PR that just introduces the new module and moves code around as needed)
Makes sense, will update the PR description to add some context as to what's going on. Talked offline, will continue with changes in this PR as the only moving of existing code is the |
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable, just a couple comments and having more information on what moved vs what is material change will help reviews
| /// should be ignored). If not already seen, register it so we can recognize future duplicates. | ||
| /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it | ||
| /// and should process it. | ||
| pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call out what is just a simple move and what is a material change? AFAICT this is code that already existed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly extracted from AddRemoveDedupVisitor
| /// The subset of file action fields that uniquely identifies it in the log, used for deduplication | ||
| /// of adds and removes during log replay. | ||
| #[derive(Debug, Hash, Eq, PartialEq)] | ||
| pub(crate) struct FileActionKey { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from scan/log_replay.rs
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments - and if we could add comments inline in the PR changes for moves/changes that would help reviewers, sorry to harp on this still but with (1) critical changes like this in log replay and (2) code movement/changing, I think we need to do everything we can to ensure thorough/effective reviews
kernel/src/scan/log_replay.rs
Outdated
| if self.deduplicator.selection_vector_ref()[i] { | ||
| self.deduplicator.selection_vector_mut()[i] = self.is_valid_add(i, getters)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we bundle this up in a method that checks selection vector then does a closure or something if true (and sets selection vector to closure output?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried something along the lines of:
self.deduplicator.process_selection_with(|i| self.is_valid_add(i, getters))?;
The issue is that is_valid_add takes &mut self, and the closure in process_selection_vector_with is trying to call that method, which creates a borrowing conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked offline - selection vector does not need to be drilled into the FileActionsDeduplicator.
| // Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may | ||
| // have a remove with a path at index 4. In either case, extract the three dv getters at | ||
| // indexes that immediately follow a valid path index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we persist the comment still? seems useful to include the context on indexes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will make the docs a bit more clear as well
| /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation | ||
| /// * `Ok(None)` - When no file action is found | ||
| /// * `Err(...)` - On any error during extraction | ||
| pub(crate) fn extract_file_action<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we not have any specific UT on these in the old module? might be worth introducing a handful of quick tests here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original module didn't have specific unit tests for these functions, as they were primarily covered by integration tests at a higher level. Since this PR is focused purely on refactoring for code organization without functional changes, I'd prefer to keep tests separate to maintain a clear scope for reviewers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure - can we track a follow-up then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 #780
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @sebastiantia and @zachschuermann, I am new to the open source community. I was told that Rust community is very supportive and helpful. I was wondering if I could try to write these tests. I joined the slack channel but the last activity there was 12 March.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋 Hi @snackoverflow215! We certainly aim to be supportive and helpful :) - please feel free to add some tests for this, that would be great! I saw you opened #783 already, I'll take a look today
| dv_getters[2].get_opt(i, "deletionVector.offset")?, | ||
| )), | ||
| None => None, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this logic is used to generate the file action key - which is now abstracted and supplied by the FileActionsDeduplicator.extract_file_action() method
| /// should be ignored). If not already seen, register it so we can recognize future duplicates. | ||
| /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it | ||
| /// and should process it. | ||
| fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method has been moved to the FileActionDeduplicator
| } | ||
|
|
||
| /// Extract the deletion vector unique ID if it exists. | ||
| fn extract_dv_unique_id<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An internal refactor, only called from extract_file_action to generate the file action key from a row of EngineData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we document some of the requirements here (e.g. you give a dv_start_index and then it does uses getters from that index and + 1 and + 2 for extracting the storage type, dv, and offset fields.)
nicklan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally looks good, just some small comments. thanks!
| self.seen.insert(key); | ||
| } | ||
| false | ||
| // The index position in the row getters for the following columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boy we need a way to get these out of the schema and not to manually encode them :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
| /// | ||
| /// # Returns | ||
| /// | ||
| /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not move the is_add inside FileActionKey? Even more rusty would be to have a quick:
enum FileActionKeyType {
Add,
Remove,
}and have that be part of the above struct.
If you want to avoid code changes in a refactor that makes sense, maybe just make a follow-up issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea 👍 will track with a follow-up issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nicklan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! thanks
zachschuermann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice :) considering the sensitivity of this area i've tried to give it a thorough review! just left a few more nits/questions and would like to have us take up some of those UT additions soon
LGTM!
| /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation | ||
| /// * `Ok(None)` - When no file action is found | ||
| /// * `Err(...)` - On any error during extraction | ||
| pub(crate) fn extract_file_action<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure - can we track a follow-up then?
| } | ||
|
|
||
| /// Extract the deletion vector unique ID if it exists. | ||
| fn extract_dv_unique_id<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we document some of the requirements here (e.g. you give a dv_start_index and then it does uses getters from that index and + 1 and + 2 for extracting the storage type, dv, and offset fields.)
| impl<'seen> FileActionDeduplicator<'seen> { | ||
| pub(crate) fn new( | ||
| seen_file_keys: &'seen mut HashSet<FileActionKey>, | ||
| is_log_batch: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking out loud: we could actually make this something like "deduplicate_batch: bool" or "save_batch: bool" ? baking in less checkpoint/commit specific logic and instead just communicating whether or not we should save the file actions in the hashmap for future deduplication (i.e. we do this in commit batches but not checkpoint batches)
let's not solve that here, perhaps talk about little rename in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a todo...
// TODO: Consider renaming to `is_commit_batch`, `deduplicate_batch`, or `save_batch` to better reflect its role in deduplication logic.
| pub mod engine_data; | ||
| pub mod error; | ||
| pub mod expressions; | ||
| pub mod log_replay; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noticing (sorry this snuck through somehow) this shouldn't be pub
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> ### Key changes resolves #737. This PR implements the `CheckpointVisitor` necessary for filtering a stream of actions into a stream of actions to be included in a checkpoint file. This leverages the `FileActionDeduplicator` [[link to PR]](#769). This PR introduces the `checkpoint` mod, and implements the visitor in the new `checkpoint/log_replay` mod. Comprehensive module documents are included in the new modules which provide an overview of the incoming code additions, along with it's goal. ### Checkpoint Content A **complete V1 checkpoint** encapsulates: 1. All FILE actions that make up the state of a version of a table: - Add actions (after action reconciliation) - Unexpired remove actions ([remove tombstones](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file)) 2. All NON-FILE actions that make up the state of a version of a table: - Protocol action - Metadata action - Txn actions A **single-file V2 checkpoint** is simply a super-set of the actions included in the V1 checkpoint schema, with the addition of the `CheckpointMetadata` action (which must be generated on every write). Since single-file v2 checkpoints will also leverage this visitor, we have chosen to name it the general `CheckpointVisitor` Note: - CDC, CommitInfo, Sidecar, and CheckpointMetadata actions are NOT part of the **V1** checkpoint schema. - Sidecar and CheckpointMetadata actions are part of the **V2** checkpoint schema. ### The new `CheckpointVisitor` This visitor selects the **FILE** actions for a V1 spec checkpoint via a selection vector: 1. Processes add/remove actions with proper deduplication based on path and deletion vector ID pairs 2. Optimization: Only tracks already seen file paths in **commit files**, as actions in checkpoint files are the last batches to be processed, and do not conflict with other actions in checkpoint files. 3. Applies tombstone expiration logic by filtering out remove actions with deletion timestamps older than the minimum file retention timestamp This visitor also selects the **NON-FILE** actions for a V1 spec checkpoint via a selection vector: 1. Ensures exactly one protocol action is included (the newest one encountered) 2. Ensures exactly one metadata action is included (the newest one encountered) 3. Deduplicates transaction (txn) actions by app ID to include only the newest action for each app ID <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> `test_checkpoint_visitor` - Tests basic functionality with both file and non-file actions, verifying correct counts and selection vector. `test_checkpoint_visitor_boundary_cases_for_tombstone_expiration` - Tests how tombstone expiration handles threshold boundary conditions. `test_checkpoint_visitor_conflicting_file_actions_in_log_batch` - Verifies duplicate path handling in log batches (keeping first, skipping second). `test_checkpoint_visitor_file_actions_in_checkpoint_batch` - Tests that duplicate file actions are included in checkpoint batches. `test_checkpoint_visitor_conflicts_with_deletion_vectors` - Tests file deduplication with deletion vectors to ensure uniqueness. `test_checkpoint_visitor_already_seen_non_file_actions` - Verifies that pre-populated actions are skipped correctly. `test_checkpoint_visitor_duplicate_non_file_actions` - Tests deduplication of non-file actions (protocol, metadata, transactions).
What changes are proposed in this pull request?
No behavioral changes were introduced, this is purely a refactoring effort
This PR extracts the core deduplication logic from the
AddRemoveDedupVisitorin order to be shared with the incomingCheckpointVisitor. For a bigger picture view on how this refactor is helpful, please take a look at the following PR which implements theCheckpointVisitorwith an embeddedFileActionsDeduplicatorthat will rebase this PR once merged. [link to PR].This
FileActionsDeduplicatorlives in the new top-levellog_replaymod as it will be leveraged in the nestedscan/log_replaymod and the incomingcheckpoints/log_replaymod. There are also additional traits & structs that the twolog_replayimplementations will share via this new top-level mod. For an even wider view of the implementation of thecheckpointsmod and the component re-use, please have a look at the following PR. [link to PR]Summary of refactor
log_replaymodFileActionKeydefinition fromscan/log_replayto the newlog_replaymodFileActionDeduplicatorin the newlog_replaymodcheck_and_record_seenmethod which was simply moved from theAddRemoveDedupVisitorextract_file_actionmethod andextract_dv_unique_idprivate method which may be new concepts, but includes functionality which are both pieces of functionality pulled from theAddRemoveDedupVisitorto be shared with the incomingCheckpointVisitorHow was this change tested?
All existing tests pass ✅